Spring Boot + Disruptor = 王炸!
👉 欢迎准备 Java 面试以及学习 Java 的同学加入我的 知识星球 ,干货很多!收费虽然是白菜价,但星球里的内容或许比你参加上万的培训班质量还要高。
👉 《Java 面试指北》 来啦!这是一份教你如何更高效地准备面试的小册,涵盖常见八股文(系统设计、常见框架、分布式、高并发 ......)、优质面经等内容。
Java面试指南网站:javaguide.cn
上一个项目用到了 Disruptor 来做内存消息队列,简单写了一篇文章,记录一下 。
Disruptor 介绍
Disruptor
是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。
“Duke 选择大奖”旨在表彰过去一年里全球个人或公司开发的、最具影响力的 Java 技术应用,由甲骨文公司主办。含金量非常高!
我专门找到了 Oracle 官方当年颁布获得 Duke's Choice Awards 项目的那篇文章(文章地址:https://blogs.oracle.com/java/post/and-the-winners-arethe-dukes-choice-award) 。从文中可以看出,同年获得此大奖荣誉的还有大名鼎鼎的 Netty 、JRebel 等项目。
并且,有一些知名的开源项目到了 Disruptor
,就比如性能强大的 Java 日志框架 Log4j 2[1] 和蚂蚁金服分布式链路跟踪组件 SOFATracer[2] 就是基于 Disruptor 来做的异步日志,相关阅读:蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践(含源码)[3]。
Disruptor
提供的功能类似于 Kafka
、RocketMQ
这类分布式队列,不过,其作为范围是 JVM(内存)。
Github 地址:https://github.com/LMAX-Exchange/disruptor 官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
Disruptor
解决了 JDK 内置线程安全队列的性能和内存安全问题。
JDK 中常见的线程安全的队列如下:
队列名字 | 锁 | 是否有界 |
---|---|---|
ArrayBlockingQueue | 加锁(ReentrantLock ) | 有界 |
LinkedBlockingQueue | 加锁(ReentrantLock ) | 有界 |
LinkedTransferQueue | 无锁(CAS ) | 无界 |
ConcurrentLinkedQueue | 无锁(CAS ) | 无界 |
从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。
Disruptor
就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。
不过, Disruptor
的基本使用非常简单,我们最重要的还是要搞懂其原理,明白它是如何被设计成这么厉害的并发框架。
Disruptor 核心概念
Event :你可以把 Event 理解为存放在队列中等待消费的消息对象。 EventFactory :事件工厂用于生产事件,我们在初始化 Disruptor
类的时候需要用到。EventHandler :Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。 EventProcessor :EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。 Disruptor :事件的生产和消费需要用到 Disruptor
对象。RingBuffer :RingBuffer(环形数组)用于保存事件。 WaitStrategy :等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。 Producer :生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。 ProducerType :指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。 Sequencer :Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Disruptor 实战
我们要使用 Disruptor
实现一个最基本的生产消费模型的整个步骤是下面这样的(标准的生产消费者模型):
定义事件(Event) : 你可以把 Event 理解为存放在队列中等待消费的消息对象。 创建事件工厂 :事件工厂用于生产事件,我们在初始化 Disruptor
类的时候需要用到。创建处理事件的 Handler
:Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。创建并启动 Disruptor
: 事件的生产和消费需要用到Disruptor
对象。发布事件 :发布的事件保存在 Disruptor
的环形数组中。关闭 Disruptor
:类似于线程池的关闭。
整个步骤看似比较复杂,其实,逻辑还是比较简单的。我们需要围绕事件(Event)和Disruptor
来做文章。
我们可以在 Mavan 仓库找到 Disruptor
的最新 jar 包。
Disruptor
的 Maven 仓库地址:https://search.maven.org/artifact/com.lmax/disruptor
Maven :
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
Gradle:
implementation 'com.lmax:disruptor:3.4.4'
1、定义事件
我们先来定义一个代表日志事件的类:LogEvent
。
事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent
对象中就有一个用来表示日志消息内容的属性:message
。
/**
* 微信搜 JavaGuide 回复"面试突击"即可免费领取个人原创的 Java 面试手册
*
* @author Guide哥
**/
public class LogEvent {
private String message;
//省略了 Getter/Setter
}
我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message
(可以参考 log4j2
对 Disruptor
的使用)。
2、创建事件工厂
创建一个工厂类 LogEventFactory
用来创建 LogEvent
对象。
LogEventFactory
继承 EventFactory
接口并实现了 newInstance()
方法 。
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}
3、创建处理事件的 Handler
创建一个用于处理后续发布的事件的类:LogEventHandler
。
LogEventHandler
继承 EventHandler
接口并实现了 onEvent()
方法 。
public class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
System.out.println(logEvent.getMessage());
}
}
EventHandler
接口的 onEvent()
方法共有 3 个参数:
event
:待消费/处理的事件sequence
:正在处理的事件在环形数组(RingBuffer
)中的位置endOfBatch
: 表示这是否是来自环形数组(RingBuffer
)中一个批次的最后一个事件(批量处理事件)
public interface EventHandler<T>
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
4、初始化 Disruptor
我们这里定义一个方法用于获取 Disruptor
对象。
private static Disruptor<LogEvent> getLogEventDisruptor() {
// 创建 LogEvent 的工厂
LogEventFactory logEventFactory = new LogEventFactory();
// Disruptor 的 RingBuffer 缓存大小
int bufferSize = 1024 * 1024;
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//实例化 Disruptor
return new Disruptor<>(
logEventFactory,
bufferSize,
threadFactory,
// 单生产者
ProducerType.SINGLE,
// 阻塞等待策略
new BlockingWaitStrategy());
}
Disruptor
的推荐使用的构造函数如下:
public class Disruptor<T> {
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
......
}
我们需要传递 5 个参数:
eventFactory
: 我们自定义的时间工厂。ringBufferSize
: 指定RingBuffer
的容量大小。threadFactory
:自定义的线程工厂。Disruptor
的默认线程池是自定义的,我们只需要传入线程工厂即可。producerType
: 指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。waitStrategy
: 等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。
ProducerType
的源码如下,它是一个包含两个变量的枚举类型
public enum ProducerType
{
SINGLE,
MULTI
}
SINGLE
: 单个事件发布者模式,不需要保证线程安全。MULTI
:多个事件发布者模式,基于 CAS 来保证线程安全。
WaitStrategy
(等待策略)接口的实现类中只有两个方法:
waitFor()
: 等待新事件的到来。signalAllWhenBlocking()
: 唤醒所有等待的消费者。
public interface WaitStrategy
{
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
void signalAllWhenBlocking();
}
WaitStrategy
的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。
除了上面介绍的这个构造函数之外,Disruptor
还有一个只有 3 个参数构造函数。
使用这个构造函数创建的 Disruptor
对象会默认使用 ProducerType.MULTI
(多个事件发布者模式)和 BlockingWaitStrategy
(阻塞等待策略) 。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
5、发布事件
//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
// 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
long sequence = ringBuffer.next();
try {
LogEvent logEvent = ringBuffer.get(sequence);
// 初始化 Event,对其赋值
logEvent.setMessage("这是第%d条日志消息".formatted(i));
} finally {
// 发布事件
ringBuffer.publish(sequence);
}
}
// 关闭 Disruptor
disruptor.shutdown();
上面的代码中,我们通过 Disruptor
的 handleEventsWith
方法来绑定处理事件的 Handler 对象。
Disruptor
可以设置多个处理事件的 Handler
,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
就比如下面的代码表示 Handler1
和 Handler2
是并行执行,最后再执行 Handler3
。
disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
6、结果
这是第1条日志消息
这是第2条日志消息
这是第3条日志消息
......
这是第99999条日志消息
这是第100000条日志消息
从打印结果可以看出,我们发布的 10w 个事件已经成功被处理。
总结
Disruptor
提供的功能类似于 Kafka
、RocketMQ
这类分布式队列,不过,其作为范围是 JVM(内存)。
Github 地址:https://github.com/LMAX-Exchange/disruptor 官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
Disruptor
在无锁的情况下还能保证队列有界,并且还是线程安全的,性能非常强,比较适合单机场景需要使用生产者-消费者模式的项目。
参考资料
Log4j 2: https://github.com/apache/logging-log4j2
[2]SOFATracer: https://github.com/sofastack/sofa-tracer
[3]蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践(含源码): https://www.sofastack.tech/blog/sofa-trcaer-disruptor-practice/
········· END ··············
👉 欢迎准备 Java 面试以及学习 Java 的同学加入我的知识星球,干货很多!收费虽然是白菜价,但星球里的内容或许比你参加上万的培训班质量还要高。
👉 《Java 面试指北》来啦!这是一份教你如何更高效地准备面试的小册,涵盖常见八股文(系统设计、常见框架、分布式、高并发 ......)、优质面经等内容。
近期文章精选 :
《JavaGuide 面试突击版》 5.0 最新版下载 2023 秋招补录&春招信息汇总,再冲一把! 31.2k!这是我见过最强的后台管理系统 !! 面试 30 家公司,终于拿到 Offer !! 为什么说程序员是一个极度劳累的工作?
👉如果本文对你有帮助的话,欢迎 点赞&在看&分享 ,这对我继续分享&创作优质文章非常重要。非常感谢!